package gu.simplemq.mqtt;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import gu.simplemq.BaseMQDispatcher;

/**
 * MQTT分发器实现<br>
 * @author guyadong
 *
 */
class MqttSubscriber extends BaseMQDispatcher<MqttClient>implements MqttCallbackExtended  {
	private final int qos;
	MqttSubscriber(MqttPoolLazy pool) {
		super(pool);
		qos = pool.getQos();
	}

	@Override
	protected void doInit() throws Exception {
		getConnection().setCallback(this);	
	}

	@Override
	protected void doSub(String channel) throws Exception {
        getConnection().subscribe(channel, qos);

	}

	@Override
	protected void doUnsub(String channel) throws Exception {
		if(getConnection().isConnected()){
			getConnection().unsubscribe(channel);
		}
	}

	@Override
	public void connectionLost(Throwable cause) {
	}

	@Override
	public void messageArrived(String topic, MqttMessage message) throws Exception {
		String text = new String(message.getPayload(),UTF_8);
		this.dispatch(topic, text);
	}

	@Override
	public void deliveryComplete(IMqttDeliveryToken token) {
	}
	
	@Override
	public void connectComplete(boolean reconnect, String serverURI) {
		if(reconnect){
			// 连接失败重新连接上后订阅所有频道
			subscribe(allRegisteredChannels().keySet().toArray(new String[0]));
		}		
	}
}
